[serverless] AWS LambdaのイベントソースにSQSを使う
Introduction
皆さんご存知のとおり、AWS Lambdaとはサーバーレスでプログラムを実行できる
環境を提供するAWSのサービスです。
実行したい処理を関数として定義すればそのまま実行できますが、
S3/DynamoDB/Kinesis/SNS/SQSなどをトリガーとしてLambdaを起動させることも可能です。
本稿ではSQSをトリガーとしてLambdaを実行させてみます。
(先日この仕組みで確認事項があり、試してみたのでそのアウトプット)
SQS + Lambdaの仕組み
仕組みは単純です。
イベントソースマッピングを行い、SQSとLambdaを関連付けるだけです。
Lambdaは自動的にスケールされ、設定により細かく制御可能です。
イベントソースマッピングは、現状標準キューとFIFOキューがサポートされています。
Lambda はキューをポーリングし、キューからメッセージを取得すると
Lambda関数を実行します。Lambda関数が正常に処理を完了すると、
キューから対象のメッセージを自動的に削除します。
Environment
- Python : 3.8.8
- Node : v16.2.0
- aws-cli : 2.2.5 Python/3.8.8
serverless?
今回はServerless Frameworkを使用します。
これはServerlessなアプリを構成管理デプロイするためのツールで、AWS Lambda以外にも
Google CloudFunctionsやAzure Functionに対応してます。
Setup & Create Example
npmでserverlessフレームワークをインストールします。
versionが表示できたらインストールOKです。
% npm install -g serverless % serverless --version Framework Core: 2.51.2 Plugin: 5.4.3 SDK: 4.2.4 Components: 3.13.3
そしてこのへんをみて
IAMユーザーの作成やらcredentialsの設定をします。
% sls create -t aws-python3 -p sqs-lambda Serverless: Generating boilerplate... Serverless: Generating boilerplate in "sqs-lambda" _______ __ | _ .-----.----.--.--.-----.----| .-----.-----.-----. | |___| -__| _| | | -__| _| | -__|__ --|__ --| |____ |_____|__| \___/|_____|__| |__|_____|_____|_____| | | | The Serverless Application Framework | | serverless.com, v2.51.2 -------' Serverless: Successfully generated boilerplate for template: "aws-python3"
deployコマンド一発でかんたんにデプロイできます。
% sls deploy Serverless: Packaging service... Serverless: Excluding development dependencies... Serverless: Uploading CloudFormation file to S3... Serverless: Uploading artifacts... Serverless: Uploading service sqs-lambda.zip file to S3 (748 B)... .............
デプロイした関数をテスト。
ちゃんと動作してますね。
% sls invoke -f hello { "statusCode": 200, "body": "{\"message\": \"Go Serverless v1.0! Your function executed successfully!\", \"input\": {}}" }
ではserverless.ymlでSQSをイベントソースにしてLambdaを起動するようにしてみます。
(必要なところだけ抜粋)
IAMの設定でSQSのアクセスを許可。
EventSourceQueueとはResource部分で作成するキューを参照しています。
・・・ iam: role: statements: - Effect: "Allow" Action: - "sqs:SendMessage" - "sqs:GetQueueUrl" Resource: "arn:aws:sqs:${self:provider.region}:<AWSアカウントID>:EventSourceQueue" - Effect: "Allow" Action: - "sqs:ListQueues" Resource: "arn:aws:sqs:${self:provider.region}:<AWSアカウントID>:*" ・・・
ResourcesでSQSキューを作成し、eventsでそのキューを指定します。
・・・ functions: hello: handler: handler.sqs_lambda events: - sqs: arn: Fn::GetAtt: - EventSourceQueue - Arn resources: Resources: EventSourceQueue: Type: "AWS::SQS::Queue" Properties: QueueName: "EventSourceQueue"
設定ファイルができたらLambdaプログラムを記述します。
メッセージはeventから送られてくるので、eventからメッセージを取り出します。
メッセージ数は、最大10件取得することが可能です。
※設定やタイミングに依存する
import json def sqs_lambda(event, context): body = { "messages": [], "input": event } for record in event['Records']: body['messages'].append(record["body"]) response = { "statusCode": 200, "body": json.dumps(body) } print(response["body"]) return response
プログラムの記述ができたらdeployします。
% sls deploy
CLIで、さきほど作成したキューにメッセージを送ります。
% aws sqs send-message --queue-url https://us-east-1.queue.amazonaws.com/<AWSアカウントID>/EventSourceQueue --message-body "Foo"
LambdaからCloudwatchログを確認すると、
Lambdaが起動してメッセージを取得しているのがわかります。
その後、使い終わったら削除しておきましょう。
% sls remove
setting SQS + Lambda
Lambdaは、SQSに到着するメッセージの数に応じて(リミットまで)
自動的にスケーリングします。
任意のパラメータを設定することでスケーリングの制限をおこなったり
同時実行数を制御できたりします。
これらのパラメータもserverles.ymlで指定可能です。
SQS parameters
SQS側で設定できるパラメータについていくつか紹介します。
-
receiveMessageWaitTimeSeconds
Lambdaがメッセージをポーリングして応答を返す前に待機する時間(1〜20秒)です。
0を指定すればショートポーリングになります。 -
visibilityTimeout
メッセージがキューから読み取られた後、
メッセージがconsumerに表示されない時間の長さです。
この値は、Lambda関数に設定したタイムアウトの6倍以上推奨とのことです。 -
maxReceiveCount
メッセージをDeadLetterQueue(処理できなかったメッセージが送られるキュー) に移動する前に、送信元キューに表示されるように配信できる回数です。
5以上推奨とのこと。 -
maximumBatchingWindow
Lamba関数を起動する前に、SQSのレコードを集めるための最大時間です。
最大300秒まで設定可能です。
Lambda parameters
-
reservedConcurrency
予約済みの同時実行制限です。
同時に実行できるLambda関数の実行数を設定します。
5以上推奨とのこと。 -
batchSize
Lambdaが1回のバッチでSQSキューから取得するメッセージの最大数です。
※指定しても、必ず最大数とれるとは限らない